HBASE-12993 Use HBase 1.0 interfaces in hbase-thrift (Solomon Duskis)

This commit is contained in:
tedyu 2015-03-29 06:45:49 -07:00
parent ced0e324a1
commit bfb04d26a7
2 changed files with 65 additions and 53 deletions

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
@ -151,6 +152,13 @@ public class ConnectionCache {
return connInfo.connection.getTable(TableName.valueOf(tableName)); return connInfo.connection.getTable(TableName.valueOf(tableName));
} }
/**
* Retrieve a regionLocator for the table. The user should close the RegionLocator.
*/
public RegionLocator getRegionLocator(byte[] tableName) throws IOException {
return getCurrentConnection().connection.getRegionLocator(TableName.valueOf(tableName));
}
/** /**
* Get the cached connection for the current user. * Get the cached connection for the current user.
* If none or timed out, create a new one. * If none or timed out, create a new one.

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -67,13 +68,14 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.OperationWithAttributes; import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.ParseFilter; import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.PrefixFilter;
@ -624,7 +626,7 @@ public class ThriftServerRunner implements Runnable {
/** /**
* The HBaseHandler is a glue object that connects Thrift RPC calls to the * The HBaseHandler is a glue object that connects Thrift RPC calls to the
* HBase client API primarily defined in the HBaseAdmin and HTable objects. * HBase client API primarily defined in the Admin and Table objects.
*/ */
public static class HBaseHandler implements Hbase.Iface { public static class HBaseHandler implements Hbase.Iface {
protected Configuration conf; protected Configuration conf;
@ -637,11 +639,11 @@ public class ThriftServerRunner implements Runnable {
private final ConnectionCache connectionCache; private final ConnectionCache connectionCache;
private static ThreadLocal<Map<String, HTable>> threadLocalTables = private static ThreadLocal<Map<String, Table>> threadLocalTables =
new ThreadLocal<Map<String, HTable>>() { new ThreadLocal<Map<String, Table>>() {
@Override @Override
protected Map<String, HTable> initialValue() { protected Map<String, Table> initialValue() {
return new TreeMap<String, HTable>(); return new TreeMap<String, Table>();
} }
}; };
@ -651,12 +653,12 @@ public class ThriftServerRunner implements Runnable {
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime"; static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
/** /**
* Returns a list of all the column families for a given htable. * Returns a list of all the column families for a given Table.
* *
* @param table * @param table
* @throws IOException * @throws IOException
*/ */
byte[][] getAllColumns(HTable table) throws IOException { byte[][] getAllColumns(Table table) throws IOException {
HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
byte[][] columns = new byte[cds.length][]; byte[][] columns = new byte[cds.length][];
for (int i = 0; i < cds.length; i++) { for (int i = 0; i < cds.length; i++) {
@ -667,25 +669,25 @@ public class ThriftServerRunner implements Runnable {
} }
/** /**
* Creates and returns an HTable instance from a given table name. * Creates and returns a Table instance from a given table name.
* *
* @param tableName * @param tableName
* name of table * name of table
* @return HTable object * @return Table object
* @throws IOException * @throws IOException
* @throws IOError * @throws IOError
*/ */
public HTable getTable(final byte[] tableName) throws public Table getTable(final byte[] tableName) throws
IOException { IOException {
String table = Bytes.toString(tableName); String table = Bytes.toString(tableName);
Map<String, HTable> tables = threadLocalTables.get(); Map<String, Table> tables = threadLocalTables.get();
if (!tables.containsKey(table)) { if (!tables.containsKey(table)) {
tables.put(table, (HTable)connectionCache.getTable(table)); tables.put(table, (Table)connectionCache.getTable(table));
} }
return tables.get(table); return tables.get(table);
} }
public HTable getTable(final ByteBuffer tableName) throws IOException { public Table getTable(final ByteBuffer tableName) throws IOException {
return getTable(getBytes(tableName)); return getTable(getBytes(tableName));
} }
@ -770,7 +772,7 @@ public class ThriftServerRunner implements Runnable {
@Override @Override
public boolean isTableEnabled(ByteBuffer tableName) throws IOError { public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
try { try {
return HTable.isTableEnabled(this.conf, getBytes(tableName)); return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(e.getMessage()); throw new IOError(e.getMessage());
@ -824,20 +826,12 @@ public class ThriftServerRunner implements Runnable {
@Override @Override
public List<TRegionInfo> getTableRegions(ByteBuffer tableName) public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
throws IOError { throws IOError {
try { try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
HTable table; List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
try {
table = getTable(tableName);
} catch (TableNotFoundException ex) {
return new ArrayList<TRegionInfo>();
}
Map<HRegionInfo, ServerName> regionLocations =
table.getRegionLocations();
List<TRegionInfo> results = new ArrayList<TRegionInfo>(); List<TRegionInfo> results = new ArrayList<TRegionInfo>();
for (Map.Entry<HRegionInfo, ServerName> entry : for (HRegionLocation regionLocation : regionLocations) {
regionLocations.entrySet()) { HRegionInfo info = regionLocation.getRegionInfo();
HRegionInfo info = entry.getKey(); ServerName serverName = regionLocation.getServerName();
ServerName serverName = entry.getValue();
TRegionInfo region = new TRegionInfo(); TRegionInfo region = new TRegionInfo();
region.serverName = ByteBuffer.wrap( region.serverName = ByteBuffer.wrap(
Bytes.toBytes(serverName.getHostname())); Bytes.toBytes(serverName.getHostname()));
@ -888,7 +882,7 @@ public class ThriftServerRunner implements Runnable {
byte[] qualifier, byte[] qualifier,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError { Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Get get = new Get(getBytes(row)); Get get = new Get(getBytes(row));
addAttributes(get, attributes); addAttributes(get, attributes);
if (qualifier == null) { if (qualifier == null) {
@ -930,7 +924,7 @@ public class ThriftServerRunner implements Runnable {
public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Get get = new Get(getBytes(row)); Get get = new Get(getBytes(row));
addAttributes(get, attributes); addAttributes(get, attributes);
if (null == qualifier) { if (null == qualifier) {
@ -973,7 +967,7 @@ public class ThriftServerRunner implements Runnable {
byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
throws IOError { throws IOError {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Get get = new Get(getBytes(row)); Get get = new Get(getBytes(row));
addAttributes(get, attributes); addAttributes(get, attributes);
if (null == qualifier) { if (null == qualifier) {
@ -1021,7 +1015,7 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns, ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
if (columns == null) { if (columns == null) {
Get get = new Get(getBytes(row)); Get get = new Get(getBytes(row));
addAttributes(get, attributes); addAttributes(get, attributes);
@ -1084,7 +1078,7 @@ public class ThriftServerRunner implements Runnable {
Map<ByteBuffer, ByteBuffer> attributes) throws IOError { Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try { try {
List<Get> gets = new ArrayList<Get>(rows.size()); List<Get> gets = new ArrayList<Get>(rows.size());
HTable table = getTable(tableName); Table table = getTable(tableName);
if (metrics != null) { if (metrics != null) {
metrics.incNumRowKeysInBatchGet(rows.size()); metrics.incNumRowKeysInBatchGet(rows.size());
} }
@ -1128,7 +1122,7 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer column, ByteBuffer column,
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Delete delete = new Delete(getBytes(row)); Delete delete = new Delete(getBytes(row));
addAttributes(delete, attributes); addAttributes(delete, attributes);
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@ -1157,7 +1151,7 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer tableName, ByteBuffer row, long timestamp, ByteBuffer tableName, ByteBuffer row, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError { Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Delete delete = new Delete(getBytes(row), timestamp); Delete delete = new Delete(getBytes(row), timestamp);
addAttributes(delete, attributes); addAttributes(delete, attributes);
table.delete(delete); table.delete(delete);
@ -1225,7 +1219,7 @@ public class ThriftServerRunner implements Runnable {
List<Mutation> mutations, long timestamp, List<Mutation> mutations, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, IllegalArgument { throws IOError, IllegalArgument {
HTable table = null; Table table = null;
try { try {
table = getTable(tableName); table = getTable(tableName);
Put put = new Put(getBytes(row), timestamp); Put put = new Put(getBytes(row), timestamp);
@ -1327,7 +1321,7 @@ public class ThriftServerRunner implements Runnable {
puts.add(put); puts.add(put);
} }
HTable table = null; Table table = null;
try { try {
table = getTable(tableName); table = getTable(tableName);
if (!puts.isEmpty()) if (!puts.isEmpty())
@ -1359,7 +1353,7 @@ public class ThriftServerRunner implements Runnable {
protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
byte [] family, byte [] qualifier, long amount) byte [] family, byte [] qualifier, long amount)
throws IOError, IllegalArgument, TException { throws IOError, IllegalArgument, TException {
HTable table; Table table;
try { try {
table = getTable(tableName); table = getTable(tableName);
return table.incrementColumnValue( return table.incrementColumnValue(
@ -1417,7 +1411,7 @@ public class ThriftServerRunner implements Runnable {
Map<ByteBuffer, ByteBuffer> attributes) Map<ByteBuffer, ByteBuffer> attributes)
throws IOError { throws IOError {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Scan scan = new Scan(); Scan scan = new Scan();
addAttributes(scan, attributes); addAttributes(scan, attributes);
if (tScan.isSetStartRow()) { if (tScan.isSetStartRow()) {
@ -1465,7 +1459,7 @@ public class ThriftServerRunner implements Runnable {
List<ByteBuffer> columns, List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError { Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow)); Scan scan = new Scan(getBytes(startRow));
addAttributes(scan, attributes); addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) { if(columns != null && columns.size() != 0) {
@ -1491,7 +1485,7 @@ public class ThriftServerRunner implements Runnable {
Map<ByteBuffer, ByteBuffer> attributes) Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException { throws IOError, TException {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
addAttributes(scan, attributes); addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) { if(columns != null && columns.size() != 0) {
@ -1518,7 +1512,7 @@ public class ThriftServerRunner implements Runnable {
Map<ByteBuffer, ByteBuffer> attributes) Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException { throws IOError, TException {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Scan scan = new Scan(getBytes(startAndPrefix)); Scan scan = new Scan(getBytes(startAndPrefix));
addAttributes(scan, attributes); addAttributes(scan, attributes);
Filter f = new WhileMatchFilter( Filter f = new WhileMatchFilter(
@ -1546,7 +1540,7 @@ public class ThriftServerRunner implements Runnable {
List<ByteBuffer> columns, long timestamp, List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException { Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow)); Scan scan = new Scan(getBytes(startRow));
addAttributes(scan, attributes); addAttributes(scan, attributes);
scan.setTimeRange(0, timestamp); scan.setTimeRange(0, timestamp);
@ -1573,7 +1567,7 @@ public class ThriftServerRunner implements Runnable {
Map<ByteBuffer, ByteBuffer> attributes) Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException { throws IOError, TException {
try { try {
HTable table = getTable(tableName); Table table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
addAttributes(scan, attributes); addAttributes(scan, attributes);
scan.setTimeRange(0, timestamp); scan.setTimeRange(0, timestamp);
@ -1602,7 +1596,7 @@ public class ThriftServerRunner implements Runnable {
TreeMap<ByteBuffer, ColumnDescriptor> columns = TreeMap<ByteBuffer, ColumnDescriptor> columns =
new TreeMap<ByteBuffer, ColumnDescriptor>(); new TreeMap<ByteBuffer, ColumnDescriptor>();
HTable table = getTable(tableName); Table table = getTable(tableName);
HTableDescriptor desc = table.getTableDescriptor(); HTableDescriptor desc = table.getTableDescriptor();
for (HColumnDescriptor e : desc.getFamilies()) { for (HColumnDescriptor e : desc.getFamilies()) {
@ -1621,8 +1615,7 @@ public class ThriftServerRunner implements Runnable {
public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row, public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
ByteBuffer family) throws IOError { ByteBuffer family) throws IOError {
try { try {
HTable table = getTable(getBytes(tableName)); Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
return ThriftUtilities.cellFromHBase(result.rawCells()); return ThriftUtilities.cellFromHBase(result.rawCells());
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
@ -1633,10 +1626,9 @@ public class ThriftServerRunner implements Runnable {
@Override @Override
public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
try { try {
HTable table = getTable(TableName.META_TABLE_NAME.getName());
byte[] row = getBytes(searchRow); byte[] row = getBytes(searchRow);
Result startRowResult = table.getRowOrBefore( Result startRowResult =
row, HConstants.CATALOG_FAMILY); getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
if (startRowResult == null) { if (startRowResult == null) {
throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row=" throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
@ -1670,6 +1662,18 @@ public class ThriftServerRunner implements Runnable {
} }
} }
private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
Scan scan = new Scan(row);
scan.setReversed(true);
scan.addFamily(family);
scan.setStartRow(row);
Table table = getTable(tableName);
try (ResultScanner scanner = table.getScanner(scan)) {
return scanner.next();
}
}
private void initMetrics(ThriftMetrics metrics) { private void initMetrics(ThriftMetrics metrics) {
this.metrics = metrics; this.metrics = metrics;
} }
@ -1687,7 +1691,7 @@ public class ThriftServerRunner implements Runnable {
} }
try { try {
HTable table = getTable(tincrement.getTable()); Table table = getTable(tincrement.getTable());
Increment inc = ThriftUtilities.incrementFromThrift(tincrement); Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
table.increment(inc); table.increment(inc);
} catch (IOException e) { } catch (IOException e) {
@ -1714,7 +1718,7 @@ public class ThriftServerRunner implements Runnable {
} }
try { try {
HTable table = getTable(tappend.getTable()); Table table = getTable(tappend.getTable());
Append append = ThriftUtilities.appendFromThrift(tappend); Append append = ThriftUtilities.appendFromThrift(tappend);
Result result = table.append(append); Result result = table.append(append);
return ThriftUtilities.cellFromHBase(result.rawCells()); return ThriftUtilities.cellFromHBase(result.rawCells());
@ -1744,7 +1748,7 @@ public class ThriftServerRunner implements Runnable {
throw new IllegalArgument(e.getMessage()); throw new IllegalArgument(e.getMessage());
} }
HTable table = null; Table table = null;
try { try {
table = getTable(tableName); table = getTable(tableName);
byte[][] famAndQf = KeyValue.parseColumn(getBytes(column)); byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));